package com.ctec.listener;


import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class TopicMessageListen {

    /**
     * 订阅 kafka中数据引接的任务
     * @param message
     * @param ack
     */

    @KafkaListener(topics = "test", groupId = "defaultConsumerGroup")
    public void syncUserByKafKa(String message, Acknowledgment ack) {
        try {
            // 调用具体的执行方法
            log.info("接收消息：{}",message);
            // 提交kafka消费位移
            ack.acknowledge();
        } catch (Exception e) {
            log.error("失败:" + e.getMessage() + "消息：" + message);
        } finally {
            // 提交kafka消费位移
            ack.acknowledge();
        }

    }


}
